#!/usr/bin/env python3


#### How long was the user page when the editor made this edit?
##
## takes in postproc'd wikiq data and our samples in turn.
## matches each revision to a most-recent prior revision to the editor's user page -- sounds like a window function to me. but maybe not.
## adds the length of the user page as a new field
## saves out the new dataset. 
##
##have 3 samples? run it 3x!
##
##


import sys
# add pyspark to your python path e.g.
#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
#sys.path.append("/home/nathante/sparkstuff/spark/python/")
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Window
import pyspark.sql.functions as f
from pyspark.sql import types
from pyspark.sql.types import IntegerType, StringType, DateType
import argparse
import glob
from os import mkdir
from os import path

### userpage data location is set as a global
upDataPath = '/gscratch/comdata/users/kaylea/taboo/processed_data/postproc_wikiqKHC_enwiki_userpages_202109/'



def parse_args():

        parser = argparse.ArgumentParser(description='Create a dataset of userpage edits.')
        parser.add_argument('-i', '--input_dir', help='Path to revision data.', required=True, type=str) # e.g. /gscratch/comdata/users/kaylea/
        parser.add_argument('-o', '--output_dir', help='Output directory', default='./revDataWithUPLength', type=str)
        parser.add_argument('--output_format', help = "[csv, parquet] format to output",type=str, default='csv')
        parser.add_argument('--num_partitions', help = "number of partitions to output",type=int, default=100)
        args = parser.parse_args()
        return(args)

if __name__ == "__main__":
    args = parse_args()
    conf = SparkConf().setAppName("Userpage Length At Edit Moment Spark")
    spark = SparkSession.builder.getOrCreate()
    inputDir = args.input_dir

    #get userpage length info
    reader = spark.read
    upDF = reader.parquet(upDataPath)
    upDF = upDF.orderBy(upDF.date_time.asc())
    ## only need a few fields
    upDF = upDF.drop('revid')
    upDF = upDF.drop('anon')
    upDF = upDF.drop('articleid')
    upDF = upDF.drop('collapsed_revs')
    upDF = upDF.drop('deleted')
    upDF = upDF.drop('editor')
    upDF = upDF.drop('editor_id')
    upDF = upDF.drop('minor')
    upDF = upDF.drop('revert')
    upDF = upDF.drop('reverteds')
    upDF = upDF.drop('sha1')
    upDF = upDF.drop('editor_id_or_ip')
    upDF = upDF.drop('reverted_by')
    upDF = upDF.drop('editor_nth_revert_action')
    upDF = upDF.drop('editor_nth_revert')
    upDF = upDF.drop('article_nth_revert')
    upDF = upDF.drop('year')
    upDF = upDF.drop('month')
    upDF = upDF.drop('editor_nth_edit_nocollapse')
    upDF = upDF.drop('article_nth_edit_nocollapse')
    upDF = upDF.withColumnRenamed('date_time', 'userpage_date_time')
    upDF = upDF.withColumnRenamed('namespace', 'userpage_namespace')
    upDF = upDF.withColumnRenamed('text_chars', 'userpage_text_chars')
    upDF = upDF.withColumnRenamed('title', 'userpage_title')
    print(upDF.show())

## make new column: user page name for the editor
    reader = spark.read
    df = reader.csv(inputDir, sep='\t', inferSchema=True, header=True, mode="PERMISSIVE") #if input is tsv
    df = df.orderBy(df.date_time.asc())
    df = df.withColumn('editorPlain', f.expr("substring(editor, 4, length(editor)-3)"))
    df = df.withColumn('editorUserpage', f.concat(f.lit('%22User%3A'), f.col('editorPlain'))) 
#f.concat(f.col('year'), f.lit('-'), f.col('paddedMonth')))
    #print(df.printSchema())
    #print(upDF.printSchema())
    df.printSchema()
    upDF.printSchema()


##pattern found at https://stackoverflow.com/questions/63182940/closest-date-looking-from-one-column-to-another-in-pyspark-dataframe
##idea is: join in every edit of the user page to the revision, then drop all of the ones that aren't the closest.
    #df = df.join(upDF, df.editorUserpage==upDF.userpage_title).withColumn("delta", f.when(upDF.userpage_date_time.isNull(), '0').otherwise(f.datediff(f.to_date(df.date_time), f.to_date(upDF.userpage_date_time)))
    fullDF = df.join(upDF, df.editorUserpage==upDF.userpage_title, how='left')
    print(df.show())
    print(fullDF.show())
    df.printSchema()
    fullDF.printSchema()
    #df = df.withColumn("delta", f.when(df.userpage_date_time.isNull(), '0').otherwise(f.datediff(f.to_date(df.date_time), f.to_date(df.userpage_date_time))))
#but if the field is missing, will it be isNull? test and see.
    fullDF = fullDF.withColumn("delta", f.when(fullDF.userpage_date_time.isNotNull(), f.datediff(f.to_date(fullDF.date_time), f.to_date(fullDF.userpage_date_time))).otherwise(0))
    print(df.show()) ##true test -- does this generate a decent number of 0-length user page entries? Do we have solid proportions of anons?
    fullDF = fullDF.filter("delta>=0") #we only want the entry when the date of the article edit is after the user page edit 
    print(fullDF.show())
    w = Window().partitionBy("revid") #each revision from the sample needs to appear only once
    fullDF = fullDF.withColumn("minDelta", f.min("delta").over(w))
    print(fullDF.show())
    fullDF = fullDF.filter('minDelta==delta')
    print(fullDF.show())


    fullDF = fullDF.dropDuplicates() #be kind rewind
    fullDF = fullDF.repartition(args.num_partitions)


    if not path.exists(args.output_dir):
        mkdir(args.output_dir)
    if args.output_format == "csv" or args.output_format == "tsv":
        fullDF.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
            # format == "parquet"
    else:
        print("outputting")
        fullDF.write.parquet(args.output_dir, mode='overwrite')

